Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for reverse binding between PUB and SUB #149

Merged
merged 5 commits into from
Dec 6, 2021

Conversation

dimitar-petrov
Copy link
Contributor

@dimitar-petrov dimitar-petrov commented Dec 3, 2021

In order to provide functionality required by following example #147 a patch to PUB and SUB sockets should be implemented.

Quote from dev on discord:

subscriber socket should keep list of his subscriptions and send it to PUB socket when it connects

Todos

  • add forwarder_device.rs example
  • add subs HashSet
  • convert peer_connected to async fn and update all referencing structs
  • sent all subscriptions from the HashSet on peer connect
  • refactor
  • fix code format and style

Any comments regarding possible solutions are appreciated.
Thanks.

@Alexei-Kornienko
Copy link
Collaborator

Basically we need to implement 2 things:

  1. https://github.com/zeromq/zmq.rs/blob/master/src/sub.rs#L23 Here we should add a HashSet to keep subscriptions and add/remove from it with each subscribe unsubscribe call (https://github.com/zeromq/zmq.rs/blob/master/src/sub.rs#L33)

  2. this part is more complicated. I Have a struct called backend to encapsulate most of the socket logic related to network. Backend provides call on_peer_connected which is called when a new connection is established and handshake completes (https://github.com/zeromq/zmq.rs/blob/master/src/backend.rs#L101). SUB now uses GenericSocketBackend We need to replace it with a custom backend for SUB socket that will use HashSet created on step 1 to send current subscriptions to PUB socket when it first connects. This can be done by either copy/pasting GenericSocketBackend impl to sub socket and customizing it or by using composition of structs

- remove SubSocket dependency on GenericSocketBackend
- add HashSet to store unique subscriptions
@dimitar-petrov
Copy link
Contributor Author

Couple of questions:

  1. How to access SubSocket.subs (HashSet) from MultiPeerBackend.peer_connected, since it is Composite?
  2. What about putting the HashSet as property of SubSocketBackend struct and acessing it as self.backend.subs from subscribe and unsubscribe?

@Alexei-Kornienko
Copy link
Collaborator

  1. Yeah you might be right. I guess in such case it might be easier to put it in backend itself. would need to use a mutex for it but it should not be a problem cause it's not on a fast path of the code.

Another issue I see now is that peer_connected is not async. It means that you want be able to send messages to pub socket and await them. I see 2 possible options:

  • we have try_send method that relies on eventual flush (However It may not be 100% reliable)
  • make peer_connected async - should be easy except that all other places would also be updated

src/sub.rs Outdated Show resolved Hide resolved
- move subs HashSet to SubSocketBackend and wrap it in Mutex
- patch peer_connected and send all subscriptions to the
  peer on_connect
- convert peer_connected to async function and update
  all structs that depend on it
@Alexei-Kornienko
Copy link
Collaborator

Mostly LGTM. I would suggest minor refactoring + maybe run a fmt/clippy to make sure that code looks pretty

src/sub.rs Outdated Show resolved Hide resolved
src/sub.rs Outdated Show resolved Hide resolved
- add create_subs_message associated function no `SubSocketBackend`
- refactor `peer_connected` to avoid extra copy on iter of `Mutex<HashSet>`
- use send_queue from `FramedIo` instead of getting a peer object
- run `cargo clippy --fix`
@Alexei-Kornienko
Copy link
Collaborator

Overall LGTM. Could you please fix formatting issues reported by CI so we can get it merged?

@dimitar-petrov
Copy link
Contributor Author

Sure I will take care of formatting and push:

I have two questions in the mean time:

  1. Why this snippet is not working. For some reason it is not unlocking the mutex and compiler complains due to awaiting latter in the function.
let subs = self.subs.lock();
  let subs_msgs: Vec<ZmqMessage> = subs.iter().map(
      |x| SubSocketBackend::create_subs_message(
	  &x, SubBackendMsgType::SUBSCRIBE)).collect();
  drop(subs);
  1. And I noticed that if I disconnect and reconnect the sub server process, a client with the following code does not detect anytihg and is trying to push messages.
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    println!("Start client");
    let mut socket = zeromq::PubSocket::new();
    socket.connect("tcp://127.0.0.1:30001").await.expect("Failed to connect");
    sleep(Duration::from_millis(100)).await;

    let mut msg = zeromq::ZmqMessage::from("test");
    msg.prepend(&"topic".into());

    loop {
	println!("Send ...");
	println!("{:?}", socket.send(msg.clone()).await.unwrap());
	sleep(Duration::from_millis(1000)).await
    }

    Ok(())
}

@Alexei-Kornienko
Copy link
Collaborator

  1. Seems like compiler issue. Following code does work:
        let subs_msgs = {
            let subs = self.subs.lock();
            let subs_msgs: Vec<ZmqMessage> = subs.iter().map(
                  |x| SubSocketBackend::create_subs_message(
                      &x, SubBackendMsgType::SUBSCRIBE)).collect();
            drop(subs);
            subs_msgs
        };

So in this case I would say that compiler rejects correct code.

  1. That's another open issue I have in the library (reconnect #143 ). Code currently doesn't handle reconnect on it's own as it should according to ZMQ spec. However IMHO it should be fixed via separate MR.

- `cargo fmt --all -- --check`
@dimitar-petrov
Copy link
Contributor Author

@Alexei-Kornienko, let me know if you are fine with the function/variable naming conventions I used.


for message in subs_msgs.iter() {
send_queue
.send(Message::Message(message.clone()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a minor comment. There is no need to clone message here. We are sending each message only once. You can use into_iter and send message by value

@Alexei-Kornienko
Copy link
Collaborator

LGTM.

@Alexei-Kornienko Alexei-Kornienko merged commit 9a24dd6 into zeromq:master Dec 6, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants